iT邦幫忙

2024 iThome 鐵人賽

DAY 10
0
自我挑戰組

30 天程式學習筆記:我的自學成長之路系列 第 10

[DAY 10] Airflow 實戰演練(1):從網頁爬蟲到資料庫儲存與郵件通知

  • 分享至 

  • xImage
  •  

在了解了 Airflow 的基本概念後,讓我們來看一個實際的例子,學習如何使用 Airflow 構建一個簡單的工作流程。在本篇範例中,我們將會創建一個 DAG,用於抓取網站數據、將數據存儲到資料庫中,並發送電子郵件通知。

Airflow範例

預覽圖:

https://ithelp.ithome.com.tw/upload/images/20240903/20167760rfIqefUoSc.png

1. 定義 DAG 的基本參數

default_args = {
    'owner': 'abc',
    'start_date': datetime(2023, 1, 1),
    'retries': 0,
    'email': ['abc@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}
  • owner:指定 DAG 的擁有者。
  • start_date:指定 DAG 的開始執行日期。
  • retries:指定任務失敗後的重試次數。這裡設置為 0,表示不重試。
  • email:指定通知用的電子郵件地址。
  • email_on_failureemail_on_retry:分別指定是否在任務失敗或重試時發送電子郵件通知。這裡都設置為 False

2. 定義 DAG

dag = DAG(
    'dag_template',
    default_args=default_args,
    description='dag_template',
    schedule_interval=timedelta(days=1),  # 每天執行一次
    catchup=False,
)
  • dag_template:DAG 的唯一標識符號,名稱為 'dag_template'
  • default_args:前面定義的參數作為 DAG 的默認參數。
  • description:描述這個 DAG 的功能。
  • schedule_interval:設置 DAG 的排程間隔時間,這裡設定為每天執行一次。
  • catchup:設置為 False,表示如果有過去未執行的任務不會被補執行。

3. 定義任務(Tasks)

3.1. 爬蟲任務(PythonOperator)

def crawl_and_store_to_csv():
    pass

crawl_and_store_csv_task = PythonOperator(
    task_id='crawl_and_store_csv',
    python_callable=crawl_and_store_to_csv,
    dag=dag,
)
  • crawl_and_store_to_csv:這是一個 Python 函數,模擬爬蟲操作並將結果存儲到 CSV 檔案中。實際業務邏輯應該在這個函數內實現。
  • PythonOperator:Airflow 的 Python 操作器,用於執行 Python 代碼。
  • task_id:該任務的唯一標識符號。
  • python_callable:指定要執行的 Python 函數。

3.2. 建立資料表任務(PostgresOperator)

create_data_table_task = PostgresOperator(
    task_id='create_data_table',
    postgres_conn_id="postgres_conn",
    sql='''
            CREATE TABLE IF NOT EXISTS a_table (
            id SERIAL PRIMARY KEY,
            title VARCHAR(255),
            subtitle VARCHAR(255),
            );
            ''',
    dag=dag,
)
  • PostgresOperator:Airflow 的 PostgreSQL 操作器,用於執行 SQL 指令。
  • postgres_conn_id:指定 Airflow 中已配置的 PostgreSQL 連接 ID。
  • sql:要執行的 SQL 語句,這裡用來創建一個名為 a_table 的資料表。

3.3. 發送電子郵件任務(EmailOperator)

send_email_task = EmailOperator(
    task_id='send_email',
    to=['efg@gmail.com'],
    subject='Airflow - 匯出報告',
    html_content='<p>Your Airflow job has finished.</p><p>Date: {{ execution_date }}</p>',
    files=['/opt/airflow/Downloads/report_20230619010550.pdf'],
    dag=dag
)
  • EmailOperator:Airflow 的電子郵件操作器,用於發送電子郵件。
  • task_id:該任務的唯一標識符號。
  • to:收件人的電子郵件地址。
  • subject:電子郵件的主題。
  • html_content:電子郵件的 HTML 內容,支持 Airflow 的模板變數,如 {{ execution_date }}
  • files:指定要附加到電子郵件中的檔案。

4. 設置 DAG 的任務依賴關係

crawl_and_store_csv_task >> create_data_table_task >> send_email_task
  • 這一行表示任務的執行順序:首先執行 crawl_and_store_csv_task(爬蟲任務),完成後執行 create_data_table_task(創建資料表),最後執行 send_email_task(發送電子郵件)。

結語

這個 DAG 實現了一個簡單的工作流程:

  1. 執行爬蟲腳本,將數據保存為 CSV 檔案。
  2. 在 PostgreSQL 資料庫中創建一個資料表。
  3. 任務完成後,發送電子郵件通知,並附加一個報告檔案。

明天會附上完整的程式碼,大家可以期待一下!


上一篇
[DAY 9] Python 開發者的福音: Airflow 幫你輕鬆駕馭工作流程
下一篇
[DAY 11] Airflow 實戰演練(2):完整 Python 程式碼實現數據工作流程
系列文
30 天程式學習筆記:我的自學成長之路30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言